{
 "cells": [
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# Exercise 4: Optimizing Redshift Table Design"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 1,
   "metadata": {},
   "outputs": [],
   "source": [
    "%load_ext sql"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 2,
   "metadata": {},
   "outputs": [],
   "source": [
    "from time import time\n",
    "import configparser\n",
    "import matplotlib.pyplot as plt\n",
    "import pandas as pd"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 3,
   "metadata": {},
   "outputs": [],
   "source": [
    "config = configparser.ConfigParser()\n",
    "config.read_file(open('dwh.cfg'))\n",
    "KEY=config.get('AWS','key')\n",
    "SECRET= config.get('AWS','secret')\n",
    "\n",
    "DWH_DB= config.get(\"DWH\",\"DWH_DB\")\n",
    "DWH_DB_USER= config.get(\"DWH\",\"DWH_DB_USER\")\n",
    "DWH_DB_PASSWORD= config.get(\"DWH\",\"DWH_DB_PASSWORD\")\n",
    "DWH_PORT = config.get(\"DWH\",\"DWH_PORT\")\n"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# STEP 1: Get the params of the created redshift cluster \n",
    "- We need:\n",
    "    - The redshift cluster <font color='red'>endpoint</font>\n",
    "    - The <font color='red'>IAM role ARN</font> that give access to Redshift to read from S3"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 4,
   "metadata": {},
   "outputs": [],
   "source": [
    "# FILL IN THE REDSHIFT ENDPOINT HERE\n",
    "# e.g. DWH_ENDPOINT=\"redshift-cluster-1.csmamz5zxmle.us-west-2.redshift.amazonaws.com\" \n",
    "DWH_ENDPOINT=\"dwhcluster.csmamz5zxmle.us-west-2.redshift.amazonaws.com\"\n",
    "    \n",
    "#FILL IN THE IAM ROLE ARN you got in step 2.2 of the previous exercise\n",
    "#e.g DWH_ROLE_ARN=\"arn:aws:iam::988332130976:role/dwhRole\"\n",
    "DWH_ROLE_ARN=\"arn:aws:iam::988332130976:role/dwhRole\""
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# STEP 2: Connect to the Redshift Cluster"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 5,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "postgresql://dwhuser:Passw0rd@dwhcluster.csmamz5zxmle.us-west-2.redshift.amazonaws.com:5439/dwh\n",
      "(psycopg2.OperationalError) could not translate host name \"dwhcluster.csmamz5zxmle.us-west-2.redshift.amazonaws.com\" to address: Name or service not known\n",
      "\n",
      "Connection info needed in SQLAlchemy format, example:\n",
      "               postgresql://username:password@hostname/dbname\n",
      "               or an existing connection: dict_keys([])\n"
     ]
    }
   ],
   "source": [
    "import os \n",
    "conn_string=\"postgresql://{}:{}@{}:{}/{}\".format(DWH_DB_USER, DWH_DB_PASSWORD, DWH_ENDPOINT, DWH_PORT,DWH_DB)\n",
    "print(conn_string)\n",
    "%sql $conn_string"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {
    "toc-hr-collapsed": true
   },
   "source": [
    "# STEP 3: Create Tables\n",
    "- We are going to use a benchmarking data set common for benchmarking star schemas in data warehouses.\n",
    "- The data is pre-loaded in a public bucket on the `us-west-2` region\n",
    "- Our examples will be based on the Amazon Redshfit tutorial but in a scripted environment in our workspace.\n",
    "\n",
    "![afa](https://docs.aws.amazon.com/redshift/latest/dg/images/tutorial-optimize-tables-ssb-data-model.png)\n"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## 3.1 Create tables (no distribution strategy) in the `nodist` schema"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 6,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "Environment variable $DATABASE_URL not set, and no connect string given.\n",
      "Connection info needed in SQLAlchemy format, example:\n",
      "               postgresql://username:password@hostname/dbname\n",
      "               or an existing connection: dict_keys([])\n"
     ]
    }
   ],
   "source": [
    "%%sql \n",
    "CREATE SCHEMA IF NOT EXISTS nodist;\n",
    "SET search_path TO nodist;\n",
    "\n",
    "DROP TABLE IF EXISTS part cascade;\n",
    "DROP TABLE IF EXISTS supplier;\n",
    "DROP TABLE IF EXISTS supplier;\n",
    "DROP TABLE IF EXISTS customer;\n",
    "DROP TABLE IF EXISTS dwdate;\n",
    "DROP TABLE IF EXISTS lineorder;\n",
    "\n",
    "CREATE TABLE part \n",
    "(\n",
    "  p_partkey     INTEGER NOT NULL,\n",
    "  p_name        VARCHAR(22) NOT NULL,\n",
    "  p_mfgr        VARCHAR(6) NOT NULL,\n",
    "  p_category    VARCHAR(7) NOT NULL,\n",
    "  p_brand1      VARCHAR(9) NOT NULL,\n",
    "  p_color       VARCHAR(11) NOT NULL,\n",
    "  p_type        VARCHAR(25) NOT NULL,\n",
    "  p_size        INTEGER NOT NULL,\n",
    "  p_container   VARCHAR(10) NOT NULL\n",
    ");\n",
    "\n",
    "CREATE TABLE supplier \n",
    "(\n",
    "  s_suppkey   INTEGER NOT NULL,\n",
    "  s_name      VARCHAR(25) NOT NULL,\n",
    "  s_address   VARCHAR(25) NOT NULL,\n",
    "  s_city      VARCHAR(10) NOT NULL,\n",
    "  s_nation    VARCHAR(15) NOT NULL,\n",
    "  s_region    VARCHAR(12) NOT NULL,\n",
    "  s_phone     VARCHAR(15) NOT NULL\n",
    ");\n",
    "\n",
    "CREATE TABLE customer \n",
    "(\n",
    "  c_custkey      INTEGER NOT NULL,\n",
    "  c_name         VARCHAR(25) NOT NULL,\n",
    "  c_address      VARCHAR(25) NOT NULL,\n",
    "  c_city         VARCHAR(10) NOT NULL,\n",
    "  c_nation       VARCHAR(15) NOT NULL,\n",
    "  c_region       VARCHAR(12) NOT NULL,\n",
    "  c_phone        VARCHAR(15) NOT NULL,\n",
    "  c_mktsegment   VARCHAR(10) NOT NULL\n",
    ");\n",
    "\n",
    "CREATE TABLE dwdate \n",
    "(\n",
    "  d_datekey            INTEGER NOT NULL,\n",
    "  d_date               VARCHAR(19) NOT NULL,\n",
    "  d_dayofweek          VARCHAR(10) NOT NULL,\n",
    "  d_month              VARCHAR(10) NOT NULL,\n",
    "  d_year               INTEGER NOT NULL,\n",
    "  d_yearmonthnum       INTEGER NOT NULL,\n",
    "  d_yearmonth          VARCHAR(8) NOT NULL,\n",
    "  d_daynuminweek       INTEGER NOT NULL,\n",
    "  d_daynuminmonth      INTEGER NOT NULL,\n",
    "  d_daynuminyear       INTEGER NOT NULL,\n",
    "  d_monthnuminyear     INTEGER NOT NULL,\n",
    "  d_weeknuminyear      INTEGER NOT NULL,\n",
    "  d_sellingseason      VARCHAR(13) NOT NULL,\n",
    "  d_lastdayinweekfl    VARCHAR(1) NOT NULL,\n",
    "  d_lastdayinmonthfl   VARCHAR(1) NOT NULL,\n",
    "  d_holidayfl          VARCHAR(1) NOT NULL,\n",
    "  d_weekdayfl          VARCHAR(1) NOT NULL\n",
    ");\n",
    "CREATE TABLE lineorder \n",
    "(\n",
    "  lo_orderkey          INTEGER NOT NULL,\n",
    "  lo_linenumber        INTEGER NOT NULL,\n",
    "  lo_custkey           INTEGER NOT NULL,\n",
    "  lo_partkey           INTEGER NOT NULL,\n",
    "  lo_suppkey           INTEGER NOT NULL,\n",
    "  lo_orderdate         INTEGER NOT NULL,\n",
    "  lo_orderpriority     VARCHAR(15) NOT NULL,\n",
    "  lo_shippriority      VARCHAR(1) NOT NULL,\n",
    "  lo_quantity          INTEGER NOT NULL,\n",
    "  lo_extendedprice     INTEGER NOT NULL,\n",
    "  lo_ordertotalprice   INTEGER NOT NULL,\n",
    "  lo_discount          INTEGER NOT NULL,\n",
    "  lo_revenue           INTEGER NOT NULL,\n",
    "  lo_supplycost        INTEGER NOT NULL,\n",
    "  lo_tax               INTEGER NOT NULL,\n",
    "  lo_commitdate        INTEGER NOT NULL,\n",
    "  lo_shipmode          VARCHAR(10) NOT NULL\n",
    ");"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## 3.1 Create tables (with a distribution strategy) in the `dist` schema"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "%%sql\n",
    "\n",
    "CREATE SCHEMA IF NOT EXISTS dist;\n",
    "SET search_path TO dist;\n",
    "\n",
    "DROP TABLE IF EXISTS part cascade;\n",
    "DROP TABLE IF EXISTS supplier;\n",
    "DROP TABLE IF EXISTS supplier;\n",
    "DROP TABLE IF EXISTS customer;\n",
    "DROP TABLE IF EXISTS dwdate;\n",
    "DROP TABLE IF EXISTS lineorder;\n",
    "\n",
    "CREATE TABLE part (\n",
    "  p_partkey     \tinteger     \tnot null\tsortkey distkey,\n",
    "  p_name        \tvarchar(22) \tnot null,\n",
    "  p_mfgr        \tvarchar(6)      not null,\n",
    "  p_category    \tvarchar(7)      not null,\n",
    "  p_brand1      \tvarchar(9)      not null,\n",
    "  p_color       \tvarchar(11) \tnot null,\n",
    "  p_type        \tvarchar(25) \tnot null,\n",
    "  p_size        \tinteger     \tnot null,\n",
    "  p_container   \tvarchar(10)     not null\n",
    ");\n",
    "\n",
    "CREATE TABLE supplier (\n",
    "  s_suppkey     \tinteger        not null sortkey,\n",
    "  s_name        \tvarchar(25)    not null,\n",
    "  s_address     \tvarchar(25)    not null,\n",
    "  s_city        \tvarchar(10)    not null,\n",
    "  s_nation      \tvarchar(15)    not null,\n",
    "  s_region      \tvarchar(12)    not null,\n",
    "  s_phone       \tvarchar(15)    not null)\n",
    "diststyle all;\n",
    "\n",
    "CREATE TABLE customer (\n",
    "  c_custkey     \tinteger        not null sortkey,\n",
    "  c_name        \tvarchar(25)    not null,\n",
    "  c_address     \tvarchar(25)    not null,\n",
    "  c_city        \tvarchar(10)    not null,\n",
    "  c_nation      \tvarchar(15)    not null,\n",
    "  c_region      \tvarchar(12)    not null,\n",
    "  c_phone       \tvarchar(15)    not null,\n",
    "  c_mktsegment      varchar(10)    not null)\n",
    "diststyle all;\n",
    "\n",
    "CREATE TABLE dwdate (\n",
    "  d_datekey            integer       not null sortkey,\n",
    "  d_date               varchar(19)   not null,\n",
    "  d_dayofweek\t      varchar(10)   not null,\n",
    "  d_month      \t    varchar(10)   not null,\n",
    "  d_year               integer       not null,\n",
    "  d_yearmonthnum       integer  \t not null,\n",
    "  d_yearmonth          varchar(8)\tnot null,\n",
    "  d_daynuminweek       integer       not null,\n",
    "  d_daynuminmonth      integer       not null,\n",
    "  d_daynuminyear       integer       not null,\n",
    "  d_monthnuminyear     integer       not null,\n",
    "  d_weeknuminyear      integer       not null,\n",
    "  d_sellingseason      varchar(13)    not null,\n",
    "  d_lastdayinweekfl    varchar(1)    not null,\n",
    "  d_lastdayinmonthfl   varchar(1)    not null,\n",
    "  d_holidayfl          varchar(1)    not null,\n",
    "  d_weekdayfl          varchar(1)    not null)\n",
    "diststyle all;\n",
    "\n",
    "CREATE TABLE lineorder (\n",
    "  lo_orderkey      \t    integer     \tnot null,\n",
    "  lo_linenumber        \tinteger     \tnot null,\n",
    "  lo_custkey           \tinteger     \tnot null,\n",
    "  lo_partkey           \tinteger     \tnot null distkey,\n",
    "  lo_suppkey           \tinteger     \tnot null,\n",
    "  lo_orderdate         \tinteger     \tnot null sortkey,\n",
    "  lo_orderpriority     \tvarchar(15)     not null,\n",
    "  lo_shippriority      \tvarchar(1)      not null,\n",
    "  lo_quantity          \tinteger     \tnot null,\n",
    "  lo_extendedprice     \tinteger     \tnot null,\n",
    "  lo_ordertotalprice   \tinteger     \tnot null,\n",
    "  lo_discount          \tinteger     \tnot null,\n",
    "  lo_revenue           \tinteger     \tnot null,\n",
    "  lo_supplycost        \tinteger     \tnot null,\n",
    "  lo_tax               \tinteger     \tnot null,\n",
    "  lo_commitdate         integer         not null,\n",
    "  lo_shipmode          \tvarchar(10)     not null\n",
    ");"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {
    "toc-hr-collapsed": true
   },
   "source": [
    "# STEP 4: Copying tables \n",
    "\n",
    "Our intent here is to run 5 COPY operations for the 5 tables respectively as show below.\n",
    "\n",
    "However, we want to do accomplish the following:\n",
    "- Make sure that the `DWH_ROLE_ARN` is substituted with the correct value in each query\n",
    "- Perform the data loading twice once for each schema (dist and nodist)\n",
    "- Collect timing statistics to compare the insertion times\n",
    "Thus, we have scripted the insertion as found below in the function `loadTables` which\n",
    "returns a pandas dataframe containing timing statistics for the copy operations\n",
    "\n",
    "```sql\n",
    "copy customer from 's3://awssampledbuswest2/ssbgz/customer' \n",
    "credentials 'aws_iam_role=<DWH_ROLE_ARN>'\n",
    "gzip region 'us-west-2';\n",
    "\n",
    "copy dwdate from 's3://awssampledbuswest2/ssbgz/dwdate' \n",
    "credentials 'aws_iam_role=<DWH_ROLE_ARN>'\n",
    "gzip region 'us-west-2';\n",
    "\n",
    "copy lineorder from 's3://awssampledbuswest2/ssbgz/lineorder' \n",
    "credentials 'aws_iam_role=<DWH_ROLE_ARN>'\n",
    "gzip region 'us-west-2';\n",
    "\n",
    "copy part from 's3://awssampledbuswest2/ssbgz/part' \n",
    "credentials 'aws_iam_role=<DWH_ROLE_ARN>'\n",
    "gzip region 'us-west-2';\n",
    "\n",
    "copy supplier from 's3://awssampledbuswest2/ssbgz/supplier' \n",
    "credentials 'aws_iam_role=<DWH_ROLE_ARN>'\n",
    "gzip region 'us-west-2';\n",
    "```\n"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## 4.1 Automate  the copying"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "def loadTables(schema, tables):\n",
    "    loadTimes = []\n",
    "    SQL_SET_SCEMA = \"SET search_path TO {};\".format(schema)\n",
    "    %sql $SQL_SET_SCEMA\n",
    "    \n",
    "    for table in tables:\n",
    "        SQL_COPY = \"\"\"\n",
    "copy {} from 's3://awssampledbuswest2/ssbgz/{}' \n",
    "credentials 'aws_iam_role={}'\n",
    "gzip region 'us-west-2';\n",
    "        \"\"\".format(table,table, DWH_ROLE_ARN)\n",
    "\n",
    "        print(\"======= LOADING TABLE: ** {} ** IN SCHEMA ==> {} =======\".format(table, schema))\n",
    "        print(SQL_COPY)\n",
    "\n",
    "        t0 = time()\n",
    "        %sql $SQL_COPY\n",
    "        loadTime = time()-t0\n",
    "        loadTimes.append(loadTime)\n",
    "\n",
    "        print(\"=== DONE IN: {0:.2f} sec\\n\".format(loadTime))\n",
    "    return pd.DataFrame({\"table\":tables, \"loadtime_\"+schema:loadTimes}).set_index('table')"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "#-- List of the tables to be loaded\n",
    "tables = [\"customer\",\"dwdate\",\"supplier\", \"part\", \"lineorder\"]\n",
    "\n",
    "#-- Insertion twice for each schema (WARNING!! EACH CAN TAKE MORE THAN 10 MINUTES!!!)\n",
    "nodistStats = loadTables(\"nodist\", tables)\n",
    "distStats = loadTables(\"dist\", tables)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## 4.1 Compare the load performance results"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "#-- Plotting of the timing results\n",
    "stats = distStats.join(nodistStats)\n",
    "stats.plot.bar()\n",
    "plt.show()"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# STEP 5: Compare Query Performance"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "oneDim_SQL =\"\"\"\n",
    "set enable_result_cache_for_session to off;\n",
    "SET search_path TO {};\n",
    "\n",
    "select sum(lo_extendedprice*lo_discount) as revenue\n",
    "from lineorder, dwdate\n",
    "where lo_orderdate = d_datekey\n",
    "and d_year = 1997 \n",
    "and lo_discount between 1 and 3 \n",
    "and lo_quantity < 24;\n",
    "\"\"\"\n",
    "\n",
    "twoDim_SQL=\"\"\"\n",
    "set enable_result_cache_for_session to off;\n",
    "SET search_path TO {};\n",
    "\n",
    "select sum(lo_revenue), d_year, p_brand1\n",
    "from lineorder, dwdate, part, supplier\n",
    "where lo_orderdate = d_datekey\n",
    "and lo_partkey = p_partkey\n",
    "and lo_suppkey = s_suppkey\n",
    "and p_category = 'MFGR#12'\n",
    "and s_region = 'AMERICA'\n",
    "group by d_year, p_brand1\n",
    "\"\"\"\n",
    "\n",
    "drill_SQL = \"\"\"\n",
    "set enable_result_cache_for_session to off;\n",
    "SET search_path TO {};\n",
    "\n",
    "select c_city, s_city, d_year, sum(lo_revenue) as revenue \n",
    "from customer, lineorder, supplier, dwdate\n",
    "where lo_custkey = c_custkey\n",
    "and lo_suppkey = s_suppkey\n",
    "and lo_orderdate = d_datekey\n",
    "and (c_city='UNITED KI1' or\n",
    "c_city='UNITED KI5')\n",
    "and (s_city='UNITED KI1' or\n",
    "s_city='UNITED KI5')\n",
    "and d_yearmonth = 'Dec1997'\n",
    "group by c_city, s_city, d_year\n",
    "order by d_year asc, revenue desc;\n",
    "\"\"\"\n",
    "\n",
    "\n",
    "oneDimSameDist_SQL =\"\"\"\n",
    "set enable_result_cache_for_session to off;\n",
    "SET search_path TO {};\n",
    "\n",
    "select lo_orderdate, sum(lo_extendedprice*lo_discount) as revenue  \n",
    "from lineorder, part\n",
    "where lo_partkey  = p_partkey\n",
    "group by lo_orderdate\n",
    "order by lo_orderdate\n",
    "\"\"\"\n",
    "\n",
    "def compareQueryTimes(schema):\n",
    "    queryTimes  =[] \n",
    "    for i,query in enumerate([oneDim_SQL, twoDim_SQL, drill_SQL, oneDimSameDist_SQL]):\n",
    "        t0 = time()\n",
    "        q = query.format(schema)\n",
    "        %sql $q\n",
    "        queryTime = time()-t0\n",
    "        queryTimes.append(queryTime)\n",
    "    return pd.DataFrame({\"query\":[\"oneDim\",\"twoDim\", \"drill\", \"oneDimSameDist\"], \"queryTime_\"+schema:queryTimes}).set_index('query')"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "noDistQueryTimes = compareQueryTimes(\"nodist\")\n",
    "distQueryTimes   = compareQueryTimes(\"dist\") "
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "queryTimeDF =noDistQueryTimes.join(distQueryTimes)\n",
    "queryTimeDF.plot.bar()\n",
    "plt.show()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "improvementDF = queryTimeDF[\"distImprovement\"] =100.0*(queryTimeDF['queryTime_nodist']-queryTimeDF['queryTime_dist'])/queryTimeDF['queryTime_nodist']\n",
    "improvementDF.plot.bar(title=\"% dist Improvement by query\")\n",
    "plt.show()"
   ]
  }
 ],
 "metadata": {
  "kernelspec": {
   "display_name": "Python 3",
   "language": "python",
   "name": "python3"
  },
  "language_info": {
   "codemirror_mode": {
    "name": "ipython",
    "version": 3
   },
   "file_extension": ".py",
   "mimetype": "text/x-python",
   "name": "python",
   "nbconvert_exporter": "python",
   "pygments_lexer": "ipython3",
   "version": "3.6.3"
  }
 },
 "nbformat": 4,
 "nbformat_minor": 2
}
